import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.types.Row;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;

public class JoinwithTemporalTable_rowtime
{

    public static void main(String[] args) throws Exception
    {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings sett = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(env, sett);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setAutoWatermarkInterval(1L);
        //设定watermark的间隔

//--------------------------------------下面是orders数据------------------------------------------------------------------

//        原始流数据
        DataStream orderA = env.fromCollection(Arrays.asList(
                Tuple3.of("US Dollar", 102L , 1607690070878L),
                Tuple3.of("Euro", 102L, 1607690050878L),
                Tuple3.of("Euro", 102L, 1607690070878L)))
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, Long, Long>>(Time.seconds(7)) {
                    @Override
                    public long extractTimestamp(Tuple3<String, Long, Long> element) {
                        return element.f2;
                    }
                });
        streamTableEnv.createTemporaryView("Orders", orderA,
                $("o_currency"),
                $("o_rate"),
                $("o_proctime").rowtime());



//--------------------------------下面是rates数据-----------------------------------------------------------------
        List ratesHistoryData = new ArrayList<>();
        ratesHistoryData.add(Tuple3.of("Euro", 114L, 1607690060878L));
        ratesHistoryData.add(Tuple3.of("Euro", 116L, 1607690060878L));
        ratesHistoryData.add(Tuple3.of("Euro", 119L, 1607690060878L));
        DataStream ratesHistoryStream = env.fromCollection(ratesHistoryData).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, Long, Long>>(Time.seconds(3))
        {

                    @Override
                    public long extractTimestamp(Tuple3<String, Long, Long> element)
                    {
                        return element.f2;
                    }
                });



        Table ratesHistory = streamTableEnv.fromDataStream(ratesHistoryStream,
                $("r_currency"),
                $("r_rate"),
                $("r_proctime").rowtime());

        streamTableEnv.createTemporaryView("RatesHistory", ratesHistory);
        TemporalTableFunction rates = ratesHistory.createTemporalTableFunction(
                "r_proctime",
                "r_currency");
        streamTableEnv.registerFunction("rates", rates);
//        streamTableEnv.toRetractStream(streamTableEnv.from("RatesHistory"), Row.class).print();
//        streamTableEnv.toRetractStream(streamTableEnv.from("Orders"), Row.class).print();

//--------------------------------------下面开始join----------------------------------------------------------------


        Table result = streamTableEnv.from("Orders")
                .joinLateral(
                        call("rates", $("o_proctime")),
                                $("o_currency").isEqual($("r_currency")));

        streamTableEnv.toRetractStream(result, Row.class).print();

        env.execute("");




    }

}

/*

Reference:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TemporalTableFunctionJoinTest.scala
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/TemporalTableJoinTest.scala

*/